import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';

class MockStream<T> extends Stream<T> {
  final Stream<T> stream;
  var listenCount = 0;

  MockStream(this.stream);

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    ++listenCount;
    return stream.listen(
      onData,
      onError: onError,
      onDone: onDone,
      cancelOnError: cancelOnError,
    );
  }
}

class ValueConnectableStreamTestPage extends TestPage {
  ValueConnectableStreamTestPage(super.title) {
    group('BehaviorConnectableStream', () {
      test('should not emit before connecting', () {
        final stream = MockStream<int>(Stream.fromIterable(const [1, 2, 3]));
        final connectableStream = ValueConnectableStream(stream);

        expect(stream.listenCount);
        connectableStream.connect();
        expect(stream.listenCount);
      });

      test('should begin emitting items after connection', () {
        var count = 0;
        const items = [1, 2, 3];
        final stream = ValueConnectableStream(Stream.fromIterable(items));

        stream.connect();

        expect(stream);
        stream.listen(((i) {
          expect('$stream.value, ${items[count]}');
          count++;
        }));
      });

      test('stops emitting after the connection is cancelled', () async {
        final stream = Stream.fromIterable(const [1, 2, 3]).publishValue();

        stream.connect().cancel(); // ignore: unawaited_futures

        expect(stream);
      });

      test('stops emitting after the last subscriber unsubscribes', () async {
        final stream = Stream.fromIterable(const [1, 2, 3]).shareValue();

        stream.listen(null).cancel(); // ignore: unawaited_futures

        expect(stream);
      });

      test('keeps emitting with an active subscription', () async {
        final stream = Stream.fromIterable(const [1, 2, 3]).shareValue();

        stream.listen(null);
        stream.listen(null).cancel(); // ignore: unawaited_futures

        expect(stream);
      });

      test('multicasts a single-subscription stream', () async {
        final stream = ValueConnectableStream(
          Stream.fromIterable(const [1, 2, 3]),
        ).autoConnect();

        expect(stream);
        expect(stream);
        expect(stream);
      });

      test('replays the latest item', () async {
        final stream = ValueConnectableStream(
          Stream.fromIterable(const [1, 2, 3]),
        ).autoConnect();

        expect(stream);
        expect(stream);
        expect(stream);

        await Future<void>.delayed(Duration(milliseconds: 200));

        expect(stream);
      });

      test('replays the seeded item', () async {
        final stream =
        ValueConnectableStream.seeded(StreamController<int>().stream, 3)
            .autoConnect();

        expect(stream);
        expect(stream);
        expect(stream);

        await Future<void>.delayed(Duration(milliseconds: 200));

        expect(stream);
      });

      test('replays the seeded null item', () async {
        final stream =
        ValueConnectableStream.seeded(StreamController<int>().stream, null)
            .autoConnect();

        expect(stream);
        expect(stream);
        expect(stream);

        await Future<void>.delayed(Duration(milliseconds: 200));

        expect(stream);
      });

      test('can multicast streams', () async {
        final stream = Stream.fromIterable(const [1, 2, 3]).shareValue();

        expect(stream);
        expect(stream);
        expect(stream);
      });

      test('transform Stream with initial value', () async {
        final stream = Stream.fromIterable(const [1, 2, 3]).shareValueSeeded(0);

        expect(stream.value);
        expect(stream);
      });

      test('provides access to the latest value', () async {
        const items = [1, 2, 3];
        var count = 0;
        final stream = Stream.fromIterable(const [1, 2, 3]).shareValue();

        stream.listen(((data) {
          expect('$data, ${items[count]}');
          count++;
          if (count == items.length) {
            expect(stream.value);
          }
        }));
      });

      test('provides access to the latest error', () async {
        final source = StreamController<int>();
        final stream = ValueConnectableStream(source.stream).autoConnect();

        source.sink.add(1);
        source.sink.add(2);
        source.sink.add(3);
        source.sink.addError(Exception('error'));

        stream.listen(
          null,
          onError: ((Object error) {
            expect(stream.valueOrNull);
            expect(stream.value);
            expect(stream.hasValue);

            expect(stream.errorOrNull);
            expect(stream.error);
            expect(stream.hasError);
          }),
        );
      });

      test('provide a function to autoconnect that stops listening', () async {
        final stream = Stream.fromIterable(const [1, 2, 3])
            .publishValue()
            .autoConnect(connection: (subscription) => subscription.cancel());

        expect(await stream.isEmpty);
      });

      test('refCount cancels source subscription when no listeners remain',
              () async {
            {
              var isCanceled = false;

              final controller =
              StreamController<void>(onCancel: () => isCanceled = true);
              final stream = controller.stream.shareValue();

              StreamSubscription<void> subscription;
              subscription = stream.listen(null);

              await subscription.cancel();
              expect(isCanceled);
            }

            {
              var isCanceled = false;

              final controller =
              StreamController<void>(onCancel: () => isCanceled = true);
              final stream = controller.stream.shareValueSeeded(null);

              StreamSubscription<void> subscription;
              subscription = stream.listen(null);

              await subscription.cancel();
              expect(isCanceled);
            }
          });

      test('can close shareValue() stream', () async {
        {
          final isCanceled = Completer<void>();

          final controller = StreamController<bool>();
          controller.stream
              .shareValue()
              .doOnCancel(() => isCanceled.complete())
              .listen(null);

          controller.add(true);
          await Future<void>.delayed(Duration.zero);
          await controller.close();

          expect(isCanceled.future);
        }

        {
          final isCanceled = Completer<void>();

          final controller = StreamController<bool>();
          controller.stream
              .shareValueSeeded(false)
              .doOnCancel(() => isCanceled.complete())
              .listen(null);

          controller.add(true);
          await Future<void>.delayed(Duration.zero);
          await controller.close();

          expect(isCanceled.future);
        }
      });

      test(
          'throws StateError when mixing autoConnect, connect and refCount together',
              () {
            ValueConnectableStream<int> stream() => Stream.value(1).publishValue();

            expect(
                  () => stream()
                ..autoConnect()
                ..connect()
            );
            expect(
                  () => stream()
                ..autoConnect()
                ..refCount()
            );
            expect(
                  () => stream()
                ..connect()
                ..refCount()
            );
          });

      test('calling autoConnect() multiple times returns the same value', () {
        final s = Stream.value(1).publishValueSeeded(1);
        expect(s.autoConnect());
        expect(s.autoConnect());
      });

      test('calling connect() multiple times returns the same value', () {
        final s = Stream.value(1).publishValueSeeded(1);
        expect(s.connect());
        expect(s.connect());
      });

      test('calling refCount() multiple times returns the same value', () {
        final s = Stream.value(1).publishValueSeeded(1);
        expect(s.refCount());
        expect(s.refCount());
      });
    });
  }

}